Python & Hadoop | Spark 生态

一. Python 访问 MySQL

1.安装 pymysql 模块

1) idea中, import pymysql, 没有安装的话, option + return 安装

2. 访问 mysql 测试

看看能否打印 mysql 的版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/python3
# -*-coding:utf-8-*-

import pymysql

try:
# 开启链接
conn = pymysql.connect(host='localhost',user='root',passwd='123',
db='python',port=3306,charset='utf8')
# 打开游标
cur = conn.cursor()

# 执行 sql
cur.execute('select version()')

version = cur.fetchone()

print(version)
cur.close()
conn.close()

except Exception:
print("发生异常")

3. 查询 mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/python3
# -*-coding:utf-8-*-

import pymysql

try:
# 开启链接
conn = pymysql.connect(host='localhost',user='root',passwd='123',
db='python',port=3306,charset='utf8')
# 打开游标
cur = conn.cursor()

sql = "select id, name,age from t1 where name like 'tom8%'"

# 执行 sql
cur.execute(sql)

# 取结果
all = cur.fetchall()

for rec in all:
print(rec)
# print(str(rec[0]))

conn.commit()
cur.close()
conn.close()

except Exception:
print("发生异常")

4. 大批量插入 mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/python3
# -*-coding:utf-8-*-

import pymysql

try:
# 开启链接
conn = pymysql.connect(host='localhost',user='root',passwd='123',
db='python',port=3306,charset='utf8')
# 打开游标
cur = conn.cursor()

i = 0
while i < 10000:
sql = "insert into t1(name,age) VALUES ('%s','%d')" % ('tom' + str(i), i % 100)
print(sql)
cur.execute(sql)
i += 1

conn.commit()
cur.close()
conn.close()

except Exception:
print("发生异常")

5. 执行事务

关闭 autocommit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/python3
# -*-coding:utf-8-*-

import pymysql

try:
# 开启连接
conn = pymysql.connect(host='localhost',user='root',passwd='123',db='python',port=3306,charset='utf8')

# 关闭自动提交
conn.autocommit(False)

#开启事务
conn.begin()
# 打开游标
cur = conn.cursor()

# 删除
sql = "delete from t1 WHERE id > 20000"
# 改
sql = "update t1 set age = age -1 where age >=50 "

# 聚合
sql = "select count(*) from t1 where age < 20"

# 执行 sql
cur.execute(sql)

# 有结果的, 执行完后需要fetch结果
res = cur.fetchone()
print(res[0])

# 提交连接
conn.commit()
# 关闭游标
cur.close()

except Exception:
print("发生异常")
conn.rollback()

finally:
conn.close()

二. Spark 环境使用 Python 操作 HBase

1. 环境准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
0.启动hbase集群
---------------------------------------------------------------------
如果时钟不同步。
$>su root
$>xcall.sh "ntpdate asia.pool.ntp.org"
当然, 也可以设置脚本自动同步, 详细见我的 hadoopHA 环境搭建

1.在 HBase 目录下,启动hbase的thriftserver,满足和第三方应用通信。
---------------------------------------------------------------------
$>hbase-daemon.sh start thrift2

2.查看WebUI
---------------------------------------------------------------------
/# webui端口 , 9090 rpc端口
http://cs1:9095/

3.安装mac下thrift的编译器
---------------------------------------------------------------------
brew install thrift

4.下载并安装thrift的python模块.
---------------------------------------------------------------------
import thrift ==>没有的话, option + return 安装

5.找到hbase.thrift文件进行编译,产生python文件。
---------------------------------------------------------------------
使用以下命令进行编译
cmd> thrift -o ./out -gen py hbase.thrift
生成后的路径在这里: /Users/shixuanji/Documents/资源/Jar包/HBase/out/gen-py

6.创建idea的下的新模块
---------------------------------------------------------------------

7.创建python文件Demo1.py
---------------------------------------------------------------------

8.复制生成python文件到idea下。
---------------------------------------------------------------------
mythrift/hbase/..

9.控制台环境测试
---------------------------------------------------------------------
移除spark/conf/core-site.xml | hdfs-site.xml | hive-site.xml文件
[scala] ~/apps/spark/bin
<spark-shell>
val rdd = sc.makeRDD(1 to 10)
rdd.map(e=>(e,1))

[python] ~/apps/spark/bin
<pyspark>
arr = [1,2,3,4]
rdd = sc.parallelize(arr);
rdd.map(lambda e : (e,1))

2.具体代码

2.1 对 hbase 的增删改查

https://github.com/airpoet/bigdata/blob/master/Spark_Project/HBasePythonDemo/BasicPyHbase.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# -*- encoding=utf-8 -*-

import os

#导入thrift的python模块
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

#导入自已编译生成的hbase python模块
from mythrift.hbase import THBaseService
from mythrift.hbase.ttypes import *
from mythrift.hbase.ttypes import TResult


#创建Socket连接,到s201:9090
transport = TSocket.TSocket('cs1', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = THBaseService.Client(protocol)

#打开传输端口!!!
transport.open()

# put操作
table = b'ns1:t1'
row = b'row1'
v1 = TColumnValue(b'f1', b'id', b'101')
v2 = TColumnValue(b'f1', b'name', b'tomas')
v3 = TColumnValue(b'f1', b'age', b'12')
vals = [v1, v2, v3]
put = TPut(row, vals)
client.put(table, put)
print("okkkk!!")
transport.close()


#get
table = b'ns1:t1'
rowkey=b"row1"
col_id = TColumn(b"f1",b"id")
col_name = TColumn(b"f1",b"name")
col_age = TColumn(b"f1",b"age")

cols = [col_id,col_name,col_age]
get = TGet(rowkey,cols)
res = client.get(table,get)
print(bytes.decode(res.columnValues[0].qualifier))
print(bytes.decode(res.columnValues[0].family))
print(res.columnValues[0].timestamp)
print(bytes.decode(res.columnValues[0].value))


#delete
table = b'ns1:t1'
rowkey = b"row1"
col_id = TColumn(b"f1", b"id")
col_name = TColumn(b"f1", b"name")
col_age = TColumn(b"f1", b"age")
cols = [col_id, col_name]

#构造删除对象
delete = TDelete(rowkey,cols)
res = client.deleteSingle(table, delete)
transport.close()
print("ok")


# Scan
table = b'ns1:t12'
startRow = b'1530357094900-43dwMLjxI5-0'
stopRow = b'1530357183537-43dwMLjxI5-6'
payload = TColumn(b"f1", b"payload")

cols = [payload]

scan = TScan(startRow=startRow,stopRow=stopRow,columns=cols)
# 这里如果不传 stopRow 就是扫描到结尾
scan = TScan(startRow=startRow, columns=cols)
r = client.getScannerResults(table,scan,100);
for x in r:
print("============")
print(bytes.decode(x.columnValues[0].qualifier))
print(bytes.decode(x.columnValues[0].family))
print(x.columnValues[0].timestamp)
print(bytes.decode(x.columnValues[0].value))

2.2 将爬虫爬取的网页存入 hbase

https://github.com/airpoet/bigdata/blob/master/Spark_Project/HBasePythonDemo/Crawler2HBase.py

 CrawerPageDao.py 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/python3
# -*-coding:utf-8-*-

import os

# 导入thrift的python模块
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

# 导入自已编译生成的hbase python模块
from mythrift.hbase import THBaseService
from mythrift.hbase.ttypes import *
from mythrift.hbase.ttypes import TResult

import base64

'''
创建Socket连接,到s201:9090
'''
transport = TSocket.TSocket('cs1', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = THBaseService.Client(protocol)



'''
定义函数,保存网页
'''
def savePage(url,page):
#开启连接
transport.open()
#对url进行base64编码,形成bytes,作为rowkey
urlBase64Bytes = base64.encodebytes(url.encode("utf-8"))

# put操作
table = b'ns1:pages'
rowkey = urlBase64Bytes
v1 = TColumnValue(b'f1', b'page', page)
vals = [v1]
put = TPut(rowkey, vals)
client.put(table, put)
transport.close()


'''
判断网页是否存在
'''
def exists(url):
transport.open()
# 对url进行base64编码,形成bytes,作为rowkey
urlBase64Bytes = base64.encodebytes(url.encode("utf-8"))
print(urlBase64Bytes)

table = b'ns1:pages'
rowkey = urlBase64Bytes
col_page = TColumn(b"f1",b"page")

cols = [col_page]
get = TGet(rowkey,cols)
res = client.get(table, get)
transport.close()
return res.row is not None

 Crawler2HBase.py 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#!/usr/bin/python3
# -*-coding:utf-8-*-


# 首先创建 hbase 表: pages
# $hbase> create 'ns1:pages','f1'

import urllib.request
import os
import re
import CrawerPageDao

#下载网页方法
def download(url):
#判断当前的网页是否已经下载
resp = urllib.request.urlopen(url)
pageBytes = resp.read()
resp.close

if not CrawerPageDao.exists(url):
CrawerPageDao.savePage(url, pageBytes);

try:
#解析网页的内容
pageStr = pageBytes.decode("utf-8");
#解析href地址
pattern = u'<a[\u0000-\uffff&&<sup>[href]]*href="([\u0000-\uffff&&</sup>"]*?)"'
res = re.finditer(pattern, pageStr)
for r in res:
addr = r.group(1);
print(addr)
if addr.startswith("//"):
addr = addr.replace("//","http://");

#判断网页中是否包含自己的地址
if addr.startswith("http://") and url != addr and (not CrawerPageDao.exists(addr)):
download(addr) ;

except Exception as e:
print(e)
print(pageBytes.decode("gbk", errors='ignore'));
return ;

download("http://jd.com");

三. 使用python实现spark的数据分析

参考这本书, data 等都有下载地址

Apache Spark 2 for Beginners

1.环境准备

首先当前python环境必须安装了这些组件, 由于我的mac上已经装了, 这里就不再装了

1
2
3
4
5
6
7
8
9
10
1.numpy
cmd>pip install -i https://pypi.tuna.tsinghua.edu.cn/simple numpy

2.scipy
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple scipy

3.matplotpy
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple matplotlib
python -m pip install -U pip setuptools
python -m pip install matplotlib

2.在 mac 环境的 Spark 下

也可以在 Linux 下的图形界面中通过 terminal 操作.

目录建议不要有中文, 否则会有一些警告甚至错误

我的目录在这里: /Users/shixuanji/Documents/资源/Jar包/Spark/spark-2.1.3-bin-hadoop2.7/bin/pyspark

进入我的 iTerm2, 进入pyspark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
 #导入sql
from pyspark.sql import Row
import matplotlib.pyplot as plt
import numpy as np
import pylab as P
plt.rcdefaults() # Restore the rc params from Matplotlib's internal defaults
dataDir ="file:///home/centos/ml-data/ml-1m/users.dat"
lines = sc.textFile(dataDir)
splitLines = lines.map(lambda l: l.split("::"))
usersRDD = splitLines.map(lambda p: Row(id=p[0],gender=p[1],age=int(p[2]), occupation=p[3], zipcode=p[4]))
usersDF = spark.createDataFrame(usersRDD)
usersDF.createOrReplaceTempView("users")
usersDF.show()


#生成直方图
ageDF = spark.sql("SELECT age FROM users")
ageList = ageDF.rdd.map(lambda p: p.age).collect()
ageDF.describe().show()

plt.hist(ageList)
plt.title("Age distribution of the users\n")
plt.xlabel("Age")
plt.ylabel("Number of users")
plt.show(block=False)

#密度图
from scipy.stats import gaussian_kde
density = gaussian_kde(ageList)
xAxisValues = np.linspace(0,100,1000)
density.covariance_factor = lambda : .5
density._compute_covariance()
plt.title("Age density plot of the users\n")
plt.xlabel("Age")
plt.ylabel("Density")
plt.plot(xAxisValues, density(xAxisValues))
plt.show(block=False)

#生成嵌套子图
plt.subplot(121)
plt.hist(ageList)
plt.title("Age distribution of the users\n")
plt.xlabel("Age")
plt.ylabel("Number of users")
plt.subplot(122)
plt.title("Summary of distribution\n")
plt.xlabel("Age")
plt.boxplot(ageList, vert=False)
plt.show(block=False)

#柱状图
occ10 = spark.sql("SELECT occupation, count(occupation) as usercount FROM users GROUP BY occupation ORDER BY usercount DESC LIMIT 10")
occ10.show()

occTuple = occ10.rdd.map(lambda p:(p.occupation,p.usercount)).collect()
occList, countList = zip(*occTuple)
occList

y_pos = np.arange(len(occList))
plt.barh(y_pos, countList, align='center', alpha=0.4)
plt.yticks(y_pos, occList)
plt.xlabel('Number of users')
plt.title('Top 10 user types\n')
plt.gcf().subplots_adjust(left=0.15)
plt.show(block=False)


#堆栈条形图
occGender = spark.sql("SELECT occupation, gender FROM users")
occGender.show()

occCrossTab = occGender.stat.crosstab("occupation","gender")
occupationsCrossTuple = occCrossTab.rdd.map(lambda p:(p.occupation_gender,p.M, p.F)).collect()
occList, mList, fList = zip(*occupationsCrossTuple)
N = len(occList)
ind = np.arange(N)
width = 0.75
p1 = plt.bar(ind, mList, width, color='r')
p2 = plt.bar(ind, fList, width, color='y', bottom=mList)
plt.ylabel('Count')
plt.title('Gender distribution by occupation\n')
plt.xticks(ind + width/2., occList, rotation=90)
plt.legend((p1[0], p2[0]), ('Male', 'Female'))
plt.gcf().subplots_adjust(bottom=0.25)
plt.show(block=False)

#饼图
occupationsBottom10 = spark.sql("SELECT occupation,count(occupation) as usercount FROM users GROUP BY occupation ORDER BY usercount LIMIT 10")
occupationsBottom10Tuple = occupationsBottom10.rdd.map(lambda p:(p.occupation,p.usercount)).collect()
occupationsBottom10List, countBottom10List =zip(*occupationsBottom10Tuple)
explode = (0, 0.3, 0.2, 0.15,0.1,0,0,0,0,0.1)
plt.pie(countBottom10List, explode=explode,labels=occupationsBottom10List, autopct='%1.1f%%', shadow=True,startangle=90)
plt.title('Bottom 10 user types\n')
plt.show(block=False)
如果帮到你, 可以给我赞助杯咖啡☕️
0%